package com.tpvlog.rpc.consumer;

import com.tpvlog.rpc.core.RpcRequest;
import com.tpvlog.rpc.core.ServiceMeta;
import com.tpvlog.rpc.core.utils.RpcServiceHelper;
import com.tpvlog.rpc.protocol.RpcProtocol;
import com.tpvlog.rpc.protocol.codec.RpcDecoder;
import com.tpvlog.rpc.protocol.codec.RpcEncoder;
import com.tpvlog.rpc.protocol.handler.RpcResponseHandler;
import com.tpvlog.rpc.registry.RegistryService;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RpcConsumer {

    private static final Logger LOG = LoggerFactory.getLogger(RpcConsumer.class);

    private final Bootstrap bootstrap;
    private final EventLoopGroup eventLoopGroup;

    public RpcConsumer() {
        this.eventLoopGroup = new NioEventLoopGroup(4);
        this.bootstrap = new Bootstrap();
        this.bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class)
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline()
                                .addLast(new RpcEncoder())
                                .addLast(new RpcDecoder())
                                .addLast(new RpcResponseHandler());
                    }
                });
    }

    public void sendRequest(RpcProtocol<RpcRequest> protocol, RegistryService registryService) throws Exception {
        // 1.选择一个服务提供方实例
        RpcRequest request = protocol.getBody();
        Object[] params = request.getParams();
        String serviceKey = RpcServiceHelper.buildServiceKey(request.getClassName(), request.getServiceVersion());
        int invokerHashCode = request.getParams().length > 0 ? params[0].hashCode() : serviceKey.hashCode();
        ServiceMeta serviceMetadata = registryService.discovery(serviceKey, invokerHashCode);
        if (serviceMetadata == null) {
            throw new RuntimeException("No service provider founded:" + request);
        }

        // 2.基于Netty发起请求
        ChannelFuture future = bootstrap.connect(serviceMetadata.getAddress(), serviceMetadata.getPort()).sync();
        future.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    LOG.info("connect rpc server {} on port {} success.", serviceMetadata.getAddress(), serviceMetadata.getPort());
                } else {
                    LOG.error("connect rpc server {} on port {} failed.", serviceMetadata.getAddress(), serviceMetadata.getPort());
                    future.cause().printStackTrace();
                    eventLoopGroup.shutdownGracefully();
                }
            }
        });
        future.channel().writeAndFlush(protocol);
    }
}
